/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.sql.tests;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import java.io.Serializable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
 * End-to-end test for batch SQL queries.
 *
 * <p>The sources are generated and bounded. The result is always constant.
 *
 * <p>Parameters:
 * -outputPath output file path for CsvTableSink;
 * -sqlStatement SQL statement that will be executed as executeSql
 */
public class BatchSQLTestProgram {

	public static void main(String[] args) throws Exception {
		ParameterTool params = ParameterTool.fromArgs(args);
		String outputPath = params.getRequired("outputPath");
		String sqlStatement = params.getRequired("sqlStatement");

		TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
			.useBlinkPlanner()
			.inBatchMode()
			.build());

		((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
		((TableEnvironmentInternal) tEnv).registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
		((TableEnvironmentInternal) tEnv).registerTableSinkInternal("sinkTable",
			new CsvTableSink(outputPath)
				.configure(new String[]{"f0", "f1"}, new TypeInformation[]{Types.INT, Types.SQL_TIMESTAMP}));

		TableResult result = tEnv.executeSql(sqlStatement);
		// wait job finish
		result.getJobClient().get().getJobExecutionResult().get();
	}

	/**
	 * TableSource for generated data.
	 */
	public static class GeneratorTableSource extends InputFormatTableSource<Row> {

		private final int numKeys;
		private final float recordsPerKeyAndSecond;
		private final int durationSeconds;
		private final int offsetSeconds;

		GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
			this.numKeys = numKeys;
			this.recordsPerKeyAndSecond = recordsPerKeyAndSecond;
			this.durationSeconds = durationSeconds;
			this.offsetSeconds = offsetSeconds;
		}

		@Override
		public InputFormat<Row, ?> getInputFormat() {
			return new IteratorInputFormat<>(
				DataGenerator.create(numKeys, recordsPerKeyAndSecond, durationSeconds, offsetSeconds));
		}

		@Override
		public DataType getProducedDataType() {
			return getTableSchema().toRowDataType();
		}

		@Override
		public TableSchema getTableSchema() {
			return TableSchema.builder()
				.field("key", DataTypes.INT())
				.field("rowtime", DataTypes.TIMESTAMP(3))
				.field("payload", DataTypes.STRING())
				.build();
		}
	}

	/**
	 * Iterator for generated data.
	 */
	public static class DataGenerator implements Iterator<Row>, Serializable {
		private static final long serialVersionUID = 1L;

		final int numKeys;

		private int keyIndex = 0;

		private final long durationMs;
		private final long stepMs;
		private final long offsetMs;
		private long ms = 0;

		static DataGenerator create(int numKeys, float rowsPerKeyAndSecond, int durationSeconds, int offsetSeconds) {
			int sleepMs = (int) (1000 / rowsPerKeyAndSecond);
			return new DataGenerator(numKeys, durationSeconds * 1000, sleepMs, offsetSeconds * 2000L);
		}

		DataGenerator(int numKeys, long durationMs, long stepMs, long offsetMs) {
			this.numKeys = numKeys;
			this.durationMs = durationMs;
			this.stepMs = stepMs;
			this.offsetMs = offsetMs;
		}

		@Override
		public boolean hasNext() {
			return ms < durationMs;
		}

		@Override
		public Row next() {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}
			Row row = Row.of(
				keyIndex,
				LocalDateTime.ofInstant(Instant.ofEpochMilli(ms + offsetMs), ZoneOffset.UTC),
				"Some payload...");
			++keyIndex;
			if (keyIndex >= numKeys) {
				keyIndex = 0;
				ms += stepMs;
			}
			return row;
		}
	}
}
